"""
This code aggregates the processed data from "01_prep_data.py" to various levels of observation to assit with analysis.
It also merges in the demographic and billing data. This code was written with python 3.

by: Jacob Malone (j.malone@cablelabs.com)
date: August 25, 2021
"""

import datetime as dt
from glob import glob
from time import ctime

import pandas as pd


def convert_stata_dates(num_days):
    """A function that allows me to convert the Stata date output to a datetime object in python"""
    return dt.date(1960, 1, 1) + dt.timedelta(days=num_days)


# Define working directories and create a list of the processed files from "01_prep_data.py"
data_dir = '/mnt/et01/cord_cutting_rep'
market_files = glob(f'{data_dir}/*_processed_May2020.parquet')

demo_frame = pd.read_csv('demo.csv')


def load_billing_data():
    """This function loads the billing data we need to merge in."""

    csv_dir = '/mnt/et01/mso/new_flow_2015/csv'
    demo_files = ['billinfo-file1',
                  'billinfo-file2',
                  'billinfo-file3',
                  'billinfo-file4',
                  'billinfo-file5',
                  'billinfo-file6']
    bill_vars_to_load = ['customer_key',
                         'date',
                         'monthly_amt',  # Total bill amount
                         'vid_flag',  # Indicator if they had MSO TV service
                         'tel_flag',  # Indicator if they had telephone service
                         'hs_flag',  # Indicator if they had home security service
                         'tier',  # Internet tier details
                         'node_key']  # Network node information

    # Aggregate all of the demographic files into a single DataFrame
    df_demo = pd.DataFrame()
    for demof in demo_files:
        dat_file = f'{csv_dir}/{demof}.csv'
        df_demo = pd.concat([df_demo, pd.read_csv(dat_file, header=0, usecols=bill_vars_to_load)])

    # Reduce data to only the "customer_key"s of interest
    df_demo = df_demo.loc[df_demo.customer_key.isin(demo_frame.customer_key.unique())].copy()
    df_demo['date'] = df_demo['date'].apply(convert_stata_dates)  # Convert
    df_demo['date'] = pd.to_datetime(df_demo['date'], infer_datetime_format=True)
    
    return df_demo


billing_frame = load_billing_data()  # Create the billing DataFrame
billing_frame_max = billing_frame.groupby(['customer_key'], sort=True).agg({'monthly_amt': 'max',
                                                                            'vid_flag': 'max',
                                                                            'tel_flag': 'max',
                                                                            'chs_flag': 'max',
                                                                            'tier': 'max',
                                                                            'node_key': 'first'}).reset_index()

monthly_files = []
hourly_files = []
daily_files = []
for lf in market_files:
    print(f'Processing {lf}.... {ctime()}')

    # Load file into memory
    frame = pd.read_parquet(lf)

    # Reduces rows to where we have complete inforamtion
    frame = frame.loc[frame.merge1 == 'IPDR & DPI'].copy()
    
    reduce_sum_columns = ['tot_gbdn',
                          'tot_gbup',
                          'level_admin',
                          'level_backup',
                          'level_cdn',
                          'level_gaming',
                          'level_music',
                          'level_sharing',
                          'level_streaming',
                          'level_tunnel',
                          'level_other',
                          'level_vidamazon',
                          'level_vidflash',
                          'level_vidhbo',
                          'level_vidhulu',
                          'level_vidnetflix',
                          'level_vidyoutube',
                          'level_viddish',
                          'level_vidslingtv',
                          'level_vidother',
                          'level_video',
                          'tot_gb',
                          'level_browsinstagram',
                          'level_browsamazon',
                          'level_browsapple',
                          'level_browscnn',
                          'level_browsespn',
                          'level_browsgoogle',
                          'level_browshttp',
                          'level_browshttpdl',
                          'level_browsfbook',
                          'level_browsadult',
                          'level_browsother',
                          'level_browsing']

    monthly_collapse_dict = {}
    for col in reduce_sum_columns:
        monthly_collapse_dict[col] = 'sum'

    hourly_collapse_dict = {}
    for col in reduce_sum_columns:
        hourly_collapse_dict[col] = 'mean'

    # Create monthly file
    print(f'- Creating monthly file ... {ctime()}')
    df_monthly = frame.groupby('customer_key').agg(monthly_collapse_dict).reset_index()
    df_monthly = df_monthly.merge(demo_frame, on=['customer_key'], how='inner')
    df_monthly = df_monthly.merge(billing_frame_max, on=['customer_key'], how='inner')
    out_file = f'{data_dir}/{lf.split("/")[-1].split(".")[0]}_monthly.parquet'
    monthly_files.append(out_file)
    df_monthly.to_parquet(out_file)

    # Create hourly file
    print(f'- Creating hourly file ... {ctime()}')
    df_hourly = frame.groupby(['customer_key', 'hr']).agg(hourly_collapse_dict).reset_index()
    df_hourly = df_hourly.merge(demo_frame, on=['customer_key'], how='inner')
    df_hourly = df_hourly.merge(billing_frame_max, on=['customer_key'], how='inner')
    out_file = f'{data_dir}/{lf.split("/")[-1].split(".")[0]}_hourly.parquet'
    hourly_files.append(out_file)
    df_hourly.to_parquet(out_file)

    # Create daily file
    print(f'- Creating daily file ... {ctime()}')
    df_daily = frame.groupby(['customer_key', 'date']).agg(monthly_collapse_dict).reset_index()
    df_daily = df_daily.merge(demo_frame, on=['customer_key'], how='inner')
    df_daily = df_daily.merge(billing_frame, on=['customer_key', 'date'], how='inner')
    out_file = f'{data_dir}/{lf.split("/")[-1].split(".")[0]}_daily.parquet'
    daily_files.append(out_file)
    df_daily.to_parquet(out_file)


# Aggregate the files together
print(f'- Creating monthly rollup file ... {ctime()}')
mframe = pd.concat([pd.read_parquet(m) for m in monthly_files])
mframe.to_parquet(f'{data_dir}/03_monthly_rollup.parquet')

print(f'- Creating hourly rollup file ... {ctime()}')
hframe = pd.concat([pd.read_parquet(h) for h in hourly_files])
hframe.to_parquet(f'{data_dir}/03_hourly_rollup.parquet')

print(f'- Creating daily rollup file ... {ctime()}')
dframe = pd.concat([pd.read_parquet(d) for d in daily_files])
dframe.to_parquet(f'{data_dir}/03_daily_rollup.parquet')
